fix(kafka): [Queue Instrumentation 36] Avoid dropping customer interceptor#5341
fix(kafka): [Queue Instrumentation 36] Avoid dropping customer interceptor#5341adinauer wants to merge 3 commits intofeat/kafka-producer-wrapperfrom
Conversation
Replace the concrete `implements Producer<K,V>` class with a `Proxy.newProxyInstance`-based wrapper that intercepts only the two `send()` overloads and forwards every other method reflectively to the delegate. The concrete class required explicitly delegating every method on the `Producer` interface, coupling the wrapper to a specific Kafka version: `clientInstanceId(Duration)` was added in Kafka 3.7, and the deprecated `sendOffsetsToTransaction(Map, String)` was removed in Kafka 4.0. The dynamic proxy has no such coupling — new or removed interface methods are handled automatically, giving full compatibility across all Kafka client versions. Public API change: `SentryKafkaProducer` is now a utility class with static `wrap()` overloads instead of constructors. Callers wrap a producer with `SentryKafkaProducer.wrap(producer)`. The Spring BPP and console sample are updated accordingly. Co-Authored-By: Claude <noreply@anthropic.com>
When ProducerFactory.addPostProcessor() is a no-op (the interface default), the Sentry post-processor is silently dropped and the customer gets zero producer tracing with no signal. Verify registration succeeded via getPostProcessors() after each addPostProcessor() call, and log a WARNING naming the factory bean and pointing toward SentryKafkaProducer.wrap() as the manual fallback. Co-Authored-By: Claude <noreply@anthropic.com>
If reading recordInterceptor via reflection fails, leave the container\nfactory untouched instead of installing Sentry's interceptor with a\nnull delegate. This avoids silently dropping customer-configured\ninterceptors for DLQ routing, auditing, or other message handling\nconcerns.\n\nAdd tests that preserve customer interceptors both when chaining\nsucceeds and when reflection cannot safely determine the existing\ninterceptor.\n\nCo-Authored-By: Claude <noreply@anthropic.com>
📲 Install BuildsAndroid
|
| "Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read " | ||
| + "existing recordInterceptor via reflection. Refusing to install Sentry's " | ||
| + "interceptor to avoid overwriting a customer-configured RecordInterceptor.", | ||
| e, | ||
| beanName); |
There was a problem hiding this comment.
Bug: The log call in the catch block for InterceptorReadFailedException has its arguments in the wrong order, causing the exception's stack trace to be lost.
Severity: MEDIUM
Suggested Fix
Reorder the arguments in the .log() call to match the correct signature for logging an exception with a formatted message. The correct order should be log(SentryLevel.ERROR, e, "...message...", beanName).
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location:
sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaConsumerBeanPostProcessor.java#L49-L53
Potential issue: The logger call in `SentryKafkaConsumerBeanPostProcessor` uses the
`log(level, message, args...)` signature but provides arguments in the order expected by
`log(level, throwable, message, args...)`. When a reflection error occurs while
inspecting a Kafka consumer factory, the `InterceptorReadFailedException` is passed as a
format argument instead of a throwable. This results in the exception's `toString()`
value being logged in the message, the `beanName` being ignored, and the full stack
trace being lost, which hinders debugging.
Did we get this right? 👍 / 👎 to inform future reviews.
There was a problem hiding this comment.
yep, should be .log(SentryLevel.ERROR, e, "Message", beanName)
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit c1ccbf7. Configure here.
| + "existing recordInterceptor via reflection. Refusing to install Sentry's " | ||
| + "interceptor to avoid overwriting a customer-configured RecordInterceptor.", | ||
| e, | ||
| beanName); |
There was a problem hiding this comment.
Logger arguments swapped: exception used as format arg
Medium Severity
The log call passes the exception e as the third argument and beanName as the fourth, matching the log(SentryLevel, String, Object...) overload. This causes the %s placeholder to be filled with e.toString() instead of the factory's beanName, and the exception stack trace is never logged. The intended overload is log(SentryLevel, Throwable, String, Object...), which requires the throwable to come before the message string: .log(SentryLevel.ERROR, e, "...factory '%s'...", beanName).
Reviewed by Cursor Bugbot for commit c1ccbf7. Configure here.
|
|
||
| val installed = field.get(factory) | ||
| val effective = | ||
| if (installed is SentryKafkaRecordInterceptor<*, *>) { |
There was a problem hiding this comment.
This makes the test succeed in any case, whether or not the interceptor was installed. The intention of the test, however, is to verify that the interceptor is not installed, correct?


PR Stack (Queue Instrumentation)
📜 Description
Avoid installing
SentryKafkaRecordInterceptorwhen the bean post-processor cannot safely read the existingrecordInterceptorfield fromAbstractKafkaListenerContainerFactory.Before this change, reflection failures fell back to
existing = null, and the post-processor still calledsetRecordInterceptor(sentryInterceptor). That could silently overwrite a customer-configured interceptor.After this change:
getExistingInterceptorthrows a dedicated exception when reflection failspostProcessAfterInitializationlogs an error and leaves the bean untouched💡 Motivation and Context
Addresses review finding R10-F003: reflection fallback silently drops the customer's existing
RecordInterceptor.Customers often use
RecordInterceptorfor DLQ routing, auditing, or other message handling behavior. If Sentry cannot safely discover the existing interceptor, it must disable consumer tracing for that factory instead of overwriting customer behavior.💚 How did you test it?
./gradlew spotlessApply apiDump./gradlew ':sentry-spring-jakarta:test' --tests "*SentryKafkaConsumerBeanPostProcessor*"📝 Checklist
sendDefaultPIIis enabled.🔮 Next steps
Continue addressing the remaining Queue Instrumentation review findings.
#skip-changelog